08d7125cd52d9eb33a6e8c27c2007193f83373ea,modules/elasticsearch/src/main/java/org/elasticsearch/indexer/routing/IndexersRouter.java,IndexersRouter,clusterChanged,#ClusterChangedEvent#,45
Before Change
@Override public void clusterChanged(final ClusterChangedEvent event) {
if (event.nodesChanged()) {
indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() {
@Override public IndexerClusterState execute(IndexerClusterState currentState) {
return null; //To change body of implemented methods use File | Settings | File Templates.
}
});
}
}
}
After Change
return;
}
if (event.nodesChanged() || event.metaDataChanged()) {
indexerClusterService.submitStateUpdateTask("reroute_indexers_node_changed", new IndexerClusterStateUpdateTask() {
@Override public IndexerClusterState execute(IndexerClusterState currentState) {
if (!event.state().metaData().hasIndex(indexerIndexName)) {
// if there are routings, publish an empty one (so it will be deleted on nodes), otherwise, return the same state
if (!currentState.routing().isEmpty()) {
return IndexerClusterState.builder().state(currentState).routing(IndexersRouting.builder()).build();
}
return currentState;
}
IndexersRouting.Builder routingBuilder = IndexersRouting.builder().routing(currentState.routing());
boolean dirty = false;
IndexMetaData indexMetaData = event.state().metaData().index(indexerIndexName);
// go over and create new indexer routing (with no node) for new types (indexers names)
for (Map.Entry<String, CompressedString> entry : indexMetaData.mappings().entrySet()) {
String mappingType = entry.getKey(); // mapping type is the name of the indexer
if (!currentState.routing().hasIndexerByName(mappingType)) {
// no indexer, we need to add it to the routing with no node allocation
try {
GetResponse getResponse = client.prepareGet(indexerIndexName, mappingType, "_meta").execute().actionGet();
if (getResponse.exists()) {
String indexerType = XContentMapValues.nodeStringValue(getResponse.sourceAsMap().get("type"), null);
if (indexerType == null) {
logger.warn("no indexer type provided for [{}], ignoring...", indexerIndexName);
} else {
routingBuilder.put(new IndexerRouting(new IndexerName(mappingType, indexerType), IndexerRoutingState.UNASSIGNED, null));
dirty = true;
}
}
} catch (Exception e) {
logger.warn("failed to get/parse _meta for [{}]", mappingType);
}
}
}
// now, remove routings that were deleted
for (IndexerRouting routing : currentState.routing()) {
if (!indexMetaData.mappings().containsKey(routing.indexerName().name())) {
routingBuilder.remove(routing);
dirty = true;
}
}
// now, allocate indexers
// see if we can relocate indexers (we can simply first unassign then, then publish) and then, next round, they will be assigned
// but, we need to make sure that there will *be* next round of this is the logic
if (dirty) {
return IndexerClusterState.builder().state(currentState).routing(routingBuilder).build();
}
return currentState;
}
});
}
}
}